Directed Acyclic Graph

DAG stands for Directed Acyclic Graph.From Graph Theory, a Graph is a collection of nodes connected by branches. A Directed Graph is a graph in which branches are directed from one node to other. A DAG is a directed graph in which there are no cycles or loops, i.e., if you start from a node along the directed branches, you would never visit the already visited node by any chance.

What is DAG mean to Apache Spark?
Spark Driver builds a logical flow of operations that can be represented in a graph which is directed and acyclic, also known as DAG (Directed Acyclic Graph).  
  • It is a  graph which holds the track of operations applied on RDD. 
  • It is a scheduling layer of the apache spark that implements stage-oriented scheduling. 
  • It transform a logical execution plan into a physical execution plan. Once the Action has been called SparkContext hands over a logical plan to DAG scheduler that is translated into a set of stages of the jobs that are submitted as task-set for the execution.
  • Spark uses pipelining operation to optimize its work that process combines the transformation into single stage.The Basic concepts of DAG scheduler is to maintain jobs and stages.
Need of Acyclic Graph in Spark?
The limitations of Hadoop MapReduce became a key point to introduce DAG in Spark. The computation through MapReduce in three steps:
  • The data is read from HDFS.
  • Then apply Map and Reduce operations.
  • The computed result is written back to HDFS.
Each MapReduce operation is independent of each other and Hadoop has no idea of which Map reduce would come next. Sometimes for some iteration, it is irrelevant to read and write back the immediate result between two map-reduce jobs. In such case, the memory in stable storage HDFS or disk memory gets wasted. In multiple-step, till the completion of the previous job all the jobs block from the beginning. As a result, complex computation can require a long time with small data volume.While in Spark, a DAG (Directed Acyclic Graph) of consecutive computation stages is formed. In this way, we optimize the execution plan, e.g. to minimize shuffling data around. In contrast, it is done manually in MapReduce by tuning each MapReduce step.

There are following steps through DAG scheduler works:
  • It completes the computation and execution of stages for a job. It also keeps track of RDDs and run jobs in minimum time and assigns jobs to the task scheduler. Task scheduler means submitting tasks for execution.
  • It determines the preferred locations on which, we can run each task respectively. It is possible with the task scheduler. That gets the information of current cache status.
  • It handles the track of RDDs, which are cached to devoid re-computing. In this way it also handles failure. As it remembers at what stages it already produced output files it heals the loss. Due to shuffle output files may lose so it helps to recover failure. 
How is Fault Tolerance achieved through Spark DAG?
As we know DAG keeps the record of operations applied on RDD. It holds every detail of tasks executed on different partitions of spark RDD. So at the time of failure or if losing any RDD, we can fetch it easily with the help of DAG graph. For example, If any operation is going on and all of sudden any RDD crashes. With the help of cluster manager, we will identify the partition in which loss occurs. After that through DAG, we will assign the RDD at the same time to recover the data loss. That new node will operate on the particular partition of spark RDD. It will also execute in the series of operation, where it needed to be executed.

How Spark Builds a DAG?
There are following steps of the process defining how spark creates a DAG:
  • Very first, the user submits an apache spark application to spark.
  • Than driver module takes the application from spark side.
  • The driver performs several tasks on the application. That helps to identify whether transformations and actions are present in the application.
  • All the operations are arranged further in a logical flow of operations, that arrangement is DAG.
  • Than DAG graph converted into the physical execution plan which contains stages.
  • As we discussed earlier driver identifies transformations. It also sets stage boundaries according to the nature of transformation. 
  • As wide Transformation requires data shuffling that shows it results in stage boundaries.
  • After all, DAG scheduler makes a physical execution plan, which contains tasks. Later on, those tasks are joint to make bundles to send them over the cluster.


Here, you can see that Spark created the DAG for the program written above and divided the DAG into two stages.In this DAG, you can see a clear picture of the program. First, the text file is read. Then, the transformations like map and flatMap are applied. Finally, reduceBykey is executed.

But why did Spark divided this program into two stages? Why not more than two or less than two? Basically, it depends on shuffling, i.e. whenever you perform any transformation where Spark needs to shuffle the data by communicating to the other partitions, it creates other stages for such transformations. And the transformation does not require the shuffling of your data; it creates a single stage for it.

Now, let's have a look at how many tasks have been created by Spark:


As I mentioned earlier, the Spark driver divides DAG stages into tasks. Here, you can see that each stage is divided into two tasks.But why did Spark divide only two tasks for each stage? It depends on your number of partitions. In this program, we have only two partitions, so each stage is divided into two tasks. And a single task runs on a single partition. The number of tasks for a job is:
(no of your stages * no of your partitions)

Why a new stage is formed where there is shuffling of Data?
DAGScheduler splits up a job into a collection of stages. Each stage contains a sequence of narrow transformations (that can be completed without shuffling the entire data set) separated at shuffle boundaries, i.e. where shuffle occurs. Stages are thus a result of breaking the RDD graph at shuffle boundaries. Shuffle boundaries introduce a barrier where stages/tasks must wait for the previous stage to finish before they fetch map outputs.There are two advantages of breaking tasks into stages:
  • After every shuffle operation, a new stage is created so that whenever data is lost due to shuffle(network I/O) only the previous stage will be calculated for fault tolerance.
  • For executing operations in one go Spark groups the operation which doesn’t need to share data between executors (when one partition requires the data from another partition to complete some operation like groupBy).
So, after DAGScheduler has done its work of converting this job into stages, it hands over the stage to TaskScheduler for its execution which will do the rest of the computation.

Why is Spark  DAG needful?
In hadoop mapreduce, computations take place in three steps:
  • Initially, we use HDFS (Hadoop Distributed File System) to read data every time we need.
  • After that, two transformation operations map and reduce are applied.
  • And in the third step computed result is written back to HDFS.
Due to each operation is independent of each other there is no linkage in between. Sometimes it became an issue to handle two map-reduce jobs at same time. Due to this most memory gets wasted. That results in long computation with less data volume. Therefore in spark, it automatically forms DAG logical flow of operations. That helps in minimize the data shuffling all around. This reduces the duration of computations with less data volume. It also increases the efficiency of the process with time.

Working with DAG optimizer in Spark
Optimizing a DAG is possible by rearranging and combining operators wherever possible. The DAG optimizer rearranges the order of operators to maintain the number of records of further operations. For example, if we take two operations like map () and filter () in a spark job. The optimizer will rearrange the order of both the operators. Since filtering may reduce the number of records to experience map operations.

Advantages of DAG in Spark
DAG has turned as very beneficial in several terms to us. Some of them are list-up below:

  • It is possible to execute many at same time queries through DAG. Due to only two queries (Map and Reduce) are available in mapreduce. We are not able to entertain SQL query, which is also possible on DAG. It turns out more flexible than mapreduce.
  • As it is possible to achieve fault tolerance through DAG. We can recover lost RDDs using this graph.
  • In comparison to hadoop mapreduce, DAG provides better global optimization.
What is a Parquet file?
Parquet is a columnar format file supported by many other data processing systems. Spark SQL performs both read and write operations with Parquet file and consider it be one of the best big data analytics formats so far. Parquet is a columnar format, supported by many data processing systems. The advantages of having a columnar storage are as follows:
  • Columnar storage limits IO operations.
  • It can fetch specific columns that you need to access.
  • Columnar storage consumes less space.
  • It gives better-summarized data and follows type-specific encoding.
If your cluster have limited resources, and there are many applications which need to be run, how would you ensure that your spark application will take the fixed number of resource and hence does not impact execution of other applications?

While submitting the spark application pass these two parameters .
–num-executors 10
–conf spark.dynamicAllocation.enabled = false

No comments:

Post a Comment